Skip to content

Rebalance shards when ingester status changes#6185

Merged
nadav-govari merged 18 commits intoquickwit-oss:mainfrom
ncoiffier-celonis:ingester-status-rebased
Mar 11, 2026
Merged

Rebalance shards when ingester status changes#6185
nadav-govari merged 18 commits intoquickwit-oss:mainfrom
ncoiffier-celonis:ingester-status-rebased

Conversation

@ncoiffier-celonis
Copy link
Collaborator

Description

Attempt to fix #6158

Following @guilload's suggestion here, this PR:

  • gossip the ingester status over chit chat
  • update the ingester pool when ingester status changes
  • update the indexer pool too when ingester status changes (to fix no open shard found on ingester error)
  • have the control plane rebalance the shards when the ingester status changes

With this approach, even if we have some 10s propagation delay before decomissioning, it is still possible to fail to ingest some documents if the chitchat takes longer than expected to gossip the ingester status to the control-plane.

Any feedback is welcome!!

How was this PR tested?

In addition of the unit and integration tests, I've run it against a local cluster with 2 indexer and observed that the number of errors reported in #6158 decreases from a few 100 to no errors.

Other approches

This PR is fairly identical to the branch guilload/ingester-status, rebased on main and with some additional bugfixes:

  • fix bug in timeout_after being always 0, causing to not wait
  • update ingester pool when IngesterStatus change (not only indexer pool)
  • more unit and integration tests

@guilload
Copy link
Member

guilload commented Mar 4, 2026

it is still possible to fail to ingest some documents if the chitchat takes longer than expected to gossip the ingester status to the control-plane.

technically the ingest router should just retry when that happens and there should be a path for the router to open a new shard if the ingester being decommissioned was the only one to have shard(s) for this index. Is it not what you observed?

@guilload
Copy link
Member

guilload commented Mar 4, 2026

@ncoiffier-celonis I giving you write access to the repo so next time you can push directly on this repo rather than our fork. It makes it easier for me to checkout your changes locally. Though I learnt how to use gh pr checkout <PR_NUMBER> in the meantime.

Copy link
Member

@guilload guilload left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we're close but we need to fix a few issues.

}

#[cfg(any(test, feature = "testsuite"))]
pub async fn for_test_with_ingester_status(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: seems a bit overkill to me

Copy link
Collaborator Author

@ncoiffier-celonis ncoiffier-celonis Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed with 0c1c82b: I unified ClusterNode::for_test_with_ingester_status into ClusterNode::for_test

pub struct IngestController {
ingester_pool: IngesterPool,
pub(crate) ingester_pool: IngesterPool,
pub(crate) stats: IngestControllerStats,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

let Some(mailbox) = weak_mailbox.upgrade() else {
return;
};
let mut trigger_rebalance = false;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ncoiffier-celonis please review this tricky logic thoroughly. I'm the initial author of this change and now I'm also reviewing it so I'm more likely to miss something. I could use a second pair of eyes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this logic def needs a comment. Here, we're considering both indexers and ingesters. Indexers run indexing pipelines when they're ready, they are ready to index, so we want to rebuild an index plan. Same thing when they leave.

In addition, we're considering ingesters (technically all indexers are ingesters and vice-versa because we didn't want to expose users to a new service (service as metastore, janitor, control-plane, etc. not micro service as router, ingester, debug info, etc.)

Ingesters have two level of readiness. First one same as indexer, "I'm up and running, I can connect to the metastore". Second one, "I have loaded my WAL".

So we want to rebalance when the ingester is ready ready, which can happens from the perspective of the stream of events as:

  1. Add(ready, ready)

OR

  1. Add(ready, not ready)
  2. Update(ready, ready)

The logic below tries to implement that.

Copy link
Collaborator Author

@ncoiffier-celonis ncoiffier-celonis Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's my understanding too. In 678bef5 I've added some comments, improved the tests and fixed something that I believe to be a bug (Add(ready, IngesterStatus::Initializing) should not trigger a rebalance - not a big deal, but we would have trigger an extra rebalance for nothing)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically, Add(ready, IngesterStatus::Initializing) should not trigger a rebalance (the ingester is not ready to ingest) but should trigger rebuilding the indexing plan (the indexer is ready to index). Today, rebalance and rebuilding the index plan are coupled so have no way to handle this edge case perfectly and I think that's fine if we do both on when the ingester becomes ready.

}

#[tokio::test]
async fn test_wait_for_ingester_decommission_elapsed_timeout_not_zero() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

// Ingest docs with auto-commit. With a 5s commit timeout, these documents
// sit uncommitted in the ingesters' WAL - exactly the in-flight state we
// want to exercise during draining.
ingest(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we know the shard for this index is always go to be created on the indexer that we're about to shutdown?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch; I've modified the sandbox to dynamically add nodes with e02cc5e and I've changed the test with 8815224 to start with a single indexer (to force the shard creation there) before adding a second indexer and decommissioning the first one.

Hopefully that ensures that we're creating the shard on the indexer we're decomissioning.

/// Tests that the graceful shutdown sequence works correctly in a multi-indexer
/// cluster: shutting down one indexer does NOT cause 500 errors or data loss,
/// and the cluster eventually rebalances. see #6158
#[tokio::test]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very very nice! Let's make sure this is not flaky, though. Run it 1,000 times! This is how I do it (fish):

while true
  c t --manifest-path quickwit/Cargo.toml -p quickwit-integration-tests --nocapture  -- test_graceful_shutdown_no_data_loss
end

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the suggestion; nextest has a build-in method for that (run in the folder that contains the Cargo.toml):

cargo nextest run test_graceful_shutdown_no_data_loss --stress-count 1000 --max-fail 1

1000 iterations will take 15s * 1000 ~ 4h, it's still running for me. But so far I haven't observed any failure after 100 iterations.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL nextest can do that. Sweet.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't able to run for 1000 iterations. The test is somewhat flaky, I've observed 1 failure in ~300 iterations. However, the build is configured with --retries 5, so it's unlikely to break the build, and I think it brings more value to keep it enabled. What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. What fails?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some kind of timeout:

stacktrace
    running 1 test
    test tests::ingest_v2_tests::test_graceful_shutdown_no_data_loss has been running for over 60 seconds
    test tests::ingest_v2_tests::test_graceful_shutdown_no_data_loss ... FAILED

    failures:

    failures:
        tests::ingest_v2_tests::test_graceful_shutdown_no_data_loss

    test result: FAILED. 0 passed; 1 failed; 0 ignored; 0 measured; 44 filtered out; finished in 62.47s

  stderr ───

    thread 'tests::ingest_v2_tests::test_graceful_shutdown_no_data_loss' (50872832) panicked at quickwit-integration-tests/src/tests/ingest_v2_tests.rs:961:19:
    ingest during shutdown should succeed (no 500 errors): client middleware error: error sending request for url (http://127.0.0.1:64377/api/v1/test_graceful_shutdown_no_data_loss/ingest)

    Caused by:
        operation timed out

    Stack backtrace:
       0: std::backtrace::Backtrace::create
       1: anyhow::error::<impl core::convert::From<E> for anyhow::Error>::from
       2: <T as core::convert::Into<U>>::into
       3: reqwest_retry::middleware::RetryTransientMiddleware<T,R>::execute_with_retry::{{closure}}::{{closure}}
       4: core::result::Result<T,E>::map_err
       5: reqwest_retry::middleware::RetryTransientMiddleware<T,R>::execute_with_retry::{{closure}}
       6: <reqwest_retry::middleware::RetryTransientMiddleware<T,R> as reqwest_middleware::middleware::Middleware>::handle::{{closure}}
       7: <core::pin::Pin<P> as core::future::future::Future>::poll
       8: reqwest_middleware::client::ClientWithMiddleware::execute_with_extensions::{{closure}}
       9: reqwest_middleware::client::RequestBuilder::send::{{closure}}
      10: quickwit_rest_client::rest_client::Transport::send::{{closure}}
      11: quickwit_rest_client::rest_client::QuickwitClient::ingest::{{closure}}
      12: quickwit_integration_tests::test_utils::cluster_sandbox::ingest::{{closure}}
      13: quickwit_integration_tests::tests::ingest_v2_tests::test_graceful_shutdown_no_data_loss::{{closure}}::{{closure}}
      14: <tokio::future::maybe_done::MaybeDone<Fut> as core::future::future::Future>::poll
      15: quickwit_integration_tests::tests::ingest_v2_tests::test_graceful_shutdown_no_data_loss::{{closure}}::{{closure}}
      16: <core::future::poll_fn::PollFn<F> as core::future::future::Future>::poll
      17: quickwit_integration_tests::tests::ingest_v2_tests::test_graceful_shutdown_no_data_loss::{{closure}}
      18: <core::pin::Pin<P> as core::future::future::Future>::poll
      19: <core::pin::Pin<P> as core::future::future::Future>::poll
      20: tokio::runtime::scheduler::current_thread::CoreGuard::block_on::{{closure}}::{{closure}}::{{closure}}
      21: tokio::runtime::scheduler::current_thread::CoreGuard::block_on::{{closure}}::{{closure}}
      22: tokio::runtime::scheduler::current_thread::Context::enter
      23: tokio::runtime::scheduler::current_thread::CoreGuard::block_on::{{closure}}
      24: tokio::runtime::scheduler::current_thread::CoreGuard::enter::{{closure}}
      25: tokio::runtime::context::scoped::Scoped<T>::set
      26: tokio::runtime::context::set_scheduler::{{closure}}
      27: std::thread::local::LocalKey<T>::try_with
      28: std::thread::local::LocalKey<T>::with
      29: tokio::runtime::context::set_scheduler
      30: tokio::runtime::scheduler::current_thread::CoreGuard::enter
      31: tokio::runtime::scheduler::current_thread::CoreGuard::block_on
      32: tokio::runtime::scheduler::current_thread::CurrentThread::block_on::{{closure}}
      33: tokio::runtime::context::runtime::enter_runtime
      34: tokio::runtime::scheduler::current_thread::CurrentThread::block_on
      35: tokio::runtime::runtime::Runtime::block_on_inner
      36: tokio::runtime::runtime::Runtime::block_on                                                                                                                                                                                                                           37: quickwit_integration_tests::tests::ingest_v2_tests::test_graceful_shutdown_no_data_loss
      38: quickwit_integration_tests::tests::ingest_v2_tests::test_graceful_shutdown_no_data_loss::{{closure}}
      39: core::ops::function::FnOnce::call_once
      40: test::__rust_begin_short_backtrace
      41: test::run_test::{{closure}}
      42: std::sys::backtrace::__rust_begin_short_backtrace
      43: core::ops::function::FnOnce::call_once{{vtable.shim}}
      44: std::sys::thread::unix::Thread::new::thread_start
      45: __pthread_cond_wait

    Stack backtrace:
       0: std::backtrace::Backtrace::create
       1: anyhow::error::<impl core::convert::From<E> for anyhow::Error>::from
       2: <core::result::Result<T,F> as core::ops::try_trait::FromResidual<core::result::Result<core::convert::Infallible,E>>>::from_residual
       3: quickwit_integration_tests::test_utils::cluster_sandbox::ingest::{{closure}}
       4: quickwit_integration_tests::tests::ingest_v2_tests::test_graceful_shutdown_no_data_loss::{{closure}}::{{closure}}
       5: <tokio::future::maybe_done::MaybeDone<Fut> as core::future::future::Future>::poll
       6: quickwit_integration_tests::tests::ingest_v2_tests::test_graceful_shutdown_no_data_loss::{{closure}}::{{closure}}
       7: <core::future::poll_fn::PollFn<F> as core::future::future::Future>::poll
       8: quickwit_integration_tests::tests::ingest_v2_tests::test_graceful_shutdown_no_data_loss::{{closure}}
       9: <core::pin::Pin<P> as core::future::future::Future>::poll
      10: <core::pin::Pin<P> as core::future::future::Future>::poll
      11: tokio::runtime::scheduler::current_thread::CoreGuard::block_on::{{closure}}::{{closure}}::{{closure}}
      12: tokio::runtime::scheduler::current_thread::CoreGuard::block_on::{{closure}}::{{closure}}
      13: tokio::runtime::scheduler::current_thread::Context::enter
      14: tokio::runtime::scheduler::current_thread::CoreGuard::block_on::{{closure}}
      15: tokio::runtime::scheduler::current_thread::CoreGuard::enter::{{closure}}
      16: tokio::runtime::context::scoped::Scoped<T>::set
      17: tokio::runtime::context::set_scheduler::{{closure}}
      18: std::thread::local::LocalKey<T>::try_with
      19: std::thread::local::LocalKey<T>::with
      20: tokio::runtime::context::set_scheduler
      21: tokio::runtime::scheduler::current_thread::CoreGuard::enter
      22: tokio::runtime::scheduler::current_thread::CoreGuard::block_on
      23: tokio::runtime::scheduler::current_thread::CurrentThread::block_on::{{closure}}
      24: tokio::runtime::context::runtime::enter_runtime
      25: tokio::runtime::scheduler::current_thread::CurrentThread::block_on
      26: tokio::runtime::runtime::Runtime::block_on_inner
      27: tokio::runtime::runtime::Runtime::block_on
      28: quickwit_integration_tests::tests::ingest_v2_tests::test_graceful_shutdown_no_data_loss
      29: quickwit_integration_tests::tests::ingest_v2_tests::test_graceful_shutdown_no_data_loss::{{closure}}
      30: core::ops::function::FnOnce::call_once
      31: test::__rust_begin_short_backtrace
      32: test::run_test::{{closure}}
      33: std::sys::backtrace::__rust_begin_short_backtrace
      34: core::ops::function::FnOnce::call_once{{vtable.shim}}
      35: std::sys::thread::unix::Thread::new::thread_start
      36: __pthread_cond_wait
    stack backtrace:
       0: __rustc::rust_begin_unwind
       1: core::panicking::panic_fmt
       2: core::result::unwrap_failed
       3: core::result::Result<T,E>::expect
       4: quickwit_integration_tests::tests::ingest_v2_tests::test_graceful_shutdown_no_data_loss::{{closure}}
       5: <core::pin::Pin<P> as core::future::future::Future>::poll
       6: <core::pin::Pin<P> as core::future::future::Future>::poll
       7: tokio::runtime::scheduler::current_thread::CoreGuard::block_on::{{closure}}::{{closure}}::{{closure}}
       8: tokio::runtime::scheduler::current_thread::CoreGuard::block_on::{{closure}}::{{closure}}
       9: tokio::runtime::scheduler::current_thread::Context::enter                                                                                                                                                                                                            10: tokio::runtime::scheduler::current_thread::CoreGuard::block_on::{{closure}}
      11: tokio::runtime::scheduler::current_thread::CoreGuard::enter::{{closure}}
      12: tokio::runtime::context::scoped::Scoped<T>::set
      13: tokio::runtime::context::set_scheduler::{{closure}}
      14: std::thread::local::LocalKey<T>::try_with
      15: std::thread::local::LocalKey<T>::with
      16: tokio::runtime::context::set_scheduler
      17: tokio::runtime::scheduler::current_thread::CoreGuard::enter
      18: tokio::runtime::scheduler::current_thread::CoreGuard::block_on
      19: tokio::runtime::scheduler::current_thread::CurrentThread::block_on::{{closure}}
      20: tokio::runtime::context::runtime::enter_runtime
      21: tokio::runtime::scheduler::current_thread::CurrentThread::block_on
      22: tokio::runtime::runtime::Runtime::block_on_inner
      23: tokio::runtime::runtime::Runtime::block_on
      24: quickwit_integration_tests::tests::ingest_v2_tests::test_graceful_shutdown_no_data_loss
      25: quickwit_integration_tests::tests::ingest_v2_tests::test_graceful_shutdown_no_data_loss::{{closure}}
      26: core::ops::function::FnOnce::call_once
    note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.

Ok((ingest_router, ingest_router_service, ingester_opt))
}

fn setup_ingester_pool(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, we need to be extremely careful about this convoluted logic.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that I've thought more about this, I think we have an issue with this logic. This creates a pool of write-only ingesters, which is great for the logic in quickwit-ingest, but in quickwit-indexing, the source also holds an ingester pool and we still want to be able to read and truncate from ingesters when they are in the retiring and decommissioning status. I don't think we want to actually create and mange those distinct pools so we need to maybe restrict this pool to not initializing ingesters and push the additional filtering logic whereever needed (router, control plane).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I follow you, could you explain a bit more?

From what I can see, for the ingestion paths, we're already guarding agains ingester status:

And for the read/truncate paths, we're not guarding on the ingester status (as expected):

Are you saying that setup_ingester_pool should update the ingester pool regardless of the status? Something like 84880a1 maybe?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, exactly like 84880a1. Actually, I would also not include Initializing ingesters in the pool because they're useless, but either way is fine.

@guilload guilload requested a review from nadav-govari March 4, 2026 21:46
@guilload
Copy link
Member

guilload commented Mar 4, 2026

@nadav-govari, I need your eyes on this because:

  • this is a tricky PR so the more reviewers the ...
  • you will most likely have to troubleshoot and fix the bugs it will introduce
  • this will conflict with your current work
  • this will give you a cleaner way to propagate an ingester's az

@ncoiffier-celonis ncoiffier-celonis force-pushed the ingester-status-rebased branch from 81e493d to 6bebf0d Compare March 5, 2026 10:08
@ncoiffier-celonis
Copy link
Collaborator Author

(I took the liberty to force-push after signing all the individual commits, no code change)

@ncoiffier-celonis ncoiffier-celonis force-pushed the ingester-status-rebased branch from 59f585c to 678bef5 Compare March 6, 2026 12:43
@ncoiffier-celonis
Copy link
Collaborator Author

ncoiffier-celonis commented Mar 6, 2026

@guilload I think I've addressed all your comments, would you be able to give it a second look? Thank you!! (Feel free to close the obsolete ones)

@guilload
Copy link
Member

guilload commented Mar 6, 2026

Looking good. @nadav-govari please take a look on Monday.

Copy link
Collaborator

@nadav-govari nadav-govari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally looks good to me (pending the comment).

The way I see this PR, a lot of this logic is avoided with the changes that come in my in-development change on the branch nadav/feature-node-based-routing. It refactors (and greatly simplifies) the routing layer to only deal with ingesters rather than with shards.

A big part of this change is broadcasting routing updates directly (rather than shard updates). Additionally, routing updates are also returned as part of every persist request; with enough load, the cluster eventually converges to having a very accurate picture of the state of other ingesters.

I believe we could accomplish what this PR is trying to do by piggybacking onto that change- add INGESTER_STATUS_DECOMMISSIONING as a persist response, and then update the ingester pool from there. We'd still need the rebalancing logic in the control plane, however, to ensure that the same shard isn't scheduled on the retiring ingester.

I'm not convinced that it's necessarily better, and this seems to work, so I think this is a reasonable way to start. We can revisit it later if we believe there's something more efficient we can do.

.filter(|ingester| !unavailable_leaders.contains(ingester))
.map(|ingester| (ingester, 0))
.filter(|(ingester_id, ingester)| {
ingester.status.is_ready() && !unavailable_leaders.contains(ingester_id)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to confirm this is correct - wouldn't we want to capture open shards on ingesters with other statuses, so that we can re-allocate them below? And then we can apply the ingester status when allocating?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my understanding, this function is only when opening new shards (called from try_open_shards); here the counts are used purely for load balancing new allocations among eligible ingesters. Including non-ready ingesters would incorrectly make them allocation targets. Or did I misunderstood something?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, you're right. Sorry for the confusion.

Copy link
Collaborator

@nadav-govari nadav-govari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for making this change!

@nadav-govari nadav-govari merged commit 92a526b into quickwit-oss:main Mar 11, 2026
4 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Indexer graceful shutdown causes ingestion gap and 500 errors "no shards available"

3 participants